{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Working with Streaming Data\n",
    "\n",
    "Learning Objectives\n",
    " 1. Learn how to process real-time data for ML models using Cloud Dataflow\n",
    " 2. Learn how to serve online predictions using real-time data\n",
    "\n",
    "## Introduction\n",
    "\n",
    "It can be useful to leverage real time data in a machine learning model when making a prediction. However, doing so requires setting up a streaming data pipeline which can be non-trivial. \n",
    "\n",
    "Typically you will have the following:\n",
    " - A series of IoT devices generating and sending data from the field in real-time (in our case these are the taxis)\n",
    " - A messaging bus to that receives and temporarily stores the IoT data (in our case this is Cloud Pub/Sub)\n",
    " - A streaming processing service that subscribes to the messaging bus, windows the messages and performs data transformations on each window (in our case this is Cloud Dataflow)\n",
    " - A persistent store to keep the processed data (in our case this is BigQuery)\n",
    "\n",
    "These steps happen continuously and in real-time, and are illustrated by the blue arrows in the diagram below. \n",
    "\n",
    "Once this streaming data pipeline is established, we need to modify our model serving to leverage it. This simply means adding a call to the persistent store (BigQuery) to fetch the latest real-time data when a prediction request comes in. This flow is illustrated by the red arrows in the diagram below. \n",
    "\n",
    "<img src='../assets/taxi_streaming_data.png' width='80%'>\n",
    "\n",
    "\n",
    "In this lab we will address how to process real-time data for machine learning models. We will use the same data as our previous 'taxifare' labs, but with the addition of `trips_last_5min` data as an additional feature. This is our proxy for real-time traffic.\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "import os\n",
    "import shutil\n",
    "import tensorflow as tf\n",
    "\n",
    "from google.cloud import aiplatform\n",
    "from google.cloud import bigquery\n",
    "from google.protobuf import json_format\n",
    "from google.protobuf.struct_pb2 import Value\n",
    "from matplotlib import pyplot as plt\n",
    "from tensorflow import keras\n",
    "from tensorflow.keras.callbacks import TensorBoard\n",
    "from tensorflow.keras.layers import Dense, DenseFeatures\n",
    "from tensorflow.keras.models import Sequential\n",
    "\n",
    "print(tf.__version__)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "PROJECT = 'cloud-training-demos' # REPLACE WITH YOUR PROJECT ID\n",
    "BUCKET = 'cloud-training-demos' # REPLACE WITH YOUR BUCKET NAME\n",
    "REGION = 'us-central1' # REPLACE WITH YOUR BUCKET REGION e.g. us-central1"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# For Bash Code\n",
    "os.environ['PROJECT'] = PROJECT\n",
    "os.environ['BUCKET'] = BUCKET\n",
    "os.environ['REGION'] = REGION"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "gcloud config set project $PROJECT"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Re-train our model with `trips_last_5min` feature\n",
    "\n",
    "In this lab, we want to show how to process real-time data for training and prediction. So, we need to retrain our previous model with this additional feature. Go through the notebook `4a_streaming_data_training.ipynb`. Open and run the notebook to train and save a model. This notebook is very similar to what we did in the Introduction to Tensorflow module but note the added feature for `trips_last_5min` in the model and the dataset."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Simulate Real Time Taxi Data\n",
    "\n",
    "Since we don’t actually have real-time taxi data we will synthesize it using a simple python script. The script publishes events to Google Cloud Pub/Sub.\n",
    "\n",
    "Inspect the `iot_devices.py` script in the `taxicab_traffic` folder. It is configured to send about 2,000 trip messages every five minutes with some randomness in the frequency to mimic traffic fluctuations. These numbers come from looking at the historical average of taxi ride frequency in BigQuery. \n",
    "\n",
    "In production this script would be replaced with actual taxis with IoT devices sending trip data to Cloud Pub/Sub. \n",
    "\n",
    "To execute the `iot_devices.py` script, launch a terminal and navigate to the `asl-ml-immersion/notebooks/building_production_ml_systems/solutions` directory. Then run the following two commands."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```bash\n",
    "PROJECT_ID=$(gcloud config list project --format \"value(core.project)\")\n",
    "python3 ./taxicab_traffic/iot_devices.py --project=$PROJECT_ID\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You will see new messages being published every 5 seconds. **Keep this terminal open** so it continues to publish events to the Pub/Sub topic. If you open [Pub/Sub in your Google Cloud Console](https://console.cloud.google.com/cloudpubsub/topic/list), you should be able to see a topic called `taxi_rides`."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a BigQuery table to collect the processed data\n",
    "\n",
    "In the next section, we will create a dataflow pipeline to write processed taxifare data to a BigQuery Table, however that table does not yet exist. Execute the following commands to create a BigQuery dataset called `taxifare` and a table within that dataset called `traffic_realtime`. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "bq = bigquery.Client()\n",
    "\n",
    "dataset = bigquery.Dataset(bq.dataset(\"taxifare\"))\n",
    "try:\n",
    "    bq.create_dataset(dataset)  # will fail if dataset already exists\n",
    "    print(\"Dataset created.\")\n",
    "except:\n",
    "    print(\"Dataset already exists.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next, we create a table called `traffic_realtime` and set up the schema."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dataset = bigquery.Dataset(bq.dataset(\"taxifare\"))\n",
    "\n",
    "table_ref = dataset.table(\"traffic_realtime\")\n",
    "SCHEMA = [\n",
    "    bigquery.SchemaField(\"trips_last_5min\", \"INTEGER\", mode=\"REQUIRED\"),\n",
    "    bigquery.SchemaField(\"time\", \"TIMESTAMP\", mode=\"REQUIRED\"),\n",
    "]\n",
    "table = bigquery.Table(table_ref, schema=SCHEMA)\n",
    "\n",
    "try:\n",
    "    bq.create_table(table)\n",
    "    print(\"Table created.\")\n",
    "except:\n",
    "    print(\"Table already exists.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Launch Streaming Dataflow Pipeline\n",
    "\n",
    "Now that we have our taxi data being pushed to Pub/Sub, and our BigQuery table set up, let’s consume the Pub/Sub data using a streaming DataFlow pipeline.\n",
    "\n",
    "The pipeline is defined in `./taxicab_traffic/streaming_count.py`. Open that file and inspect it. \n",
    "\n",
    "There are 5 transformations being applied:\n",
    " - Read from PubSub\n",
    " - Window the messages\n",
    " - Count number of messages in the window\n",
    " - Format the count for BigQuery\n",
    " - Write results to BigQuery\n",
    "\n",
    "**TODO:** Open the file ./taxicab_traffic/streaming_count.py and find the TODO there. Specify a sliding window that is 5 minutes long, and gets recalculated every 15 seconds. Hint: Reference the [beam programming guide](https://beam.apache.org/documentation/programming-guide/#windowing) for guidance. To check your answer reference the solution. \n",
    "\n",
    "For the second transform, we specify a sliding window that is 5 minutes long, and recalculate values every 15 seconds. \n",
    "\n",
    "In a new terminal, launch the dataflow pipeline using the command below. You can change the `BUCKET` variable, if necessary. Here it is assumed to be your `PROJECT_ID`."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```bash\n",
    "PROJECT_ID=$(gcloud config list project --format \"value(core.project)\")\n",
    "BUCKET=$PROJECT_ID # CHANGE AS NECESSARY \n",
    "python3 ./taxicab_traffic/streaming_count.py \\\n",
    "    --input_topic taxi_rides \\\n",
    "    --runner=DataflowRunner \\\n",
    "    --project=$PROJECT_ID \\\n",
    "    --temp_location=gs://$BUCKET/dataflow_streaming\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Once you've submitted the command above you can examine the progress of that job in the [Dataflow section of Cloud console](https://console.cloud.google.com/dataflow). "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Explore the data in the table"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "After a few moments, you should also see new data written to your BigQuery table as well. \n",
    "\n",
    "Re-run the query periodically to observe new data streaming in! You should see a new row every 15 seconds. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bigquery\n",
    "SELECT\n",
    "  *\n",
    "FROM\n",
    "  `taxifare.traffic_realtime`\n",
    "ORDER BY\n",
    "  time DESC\n",
    "LIMIT 10"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Make predictions from the new data\n",
    "\n",
    "In the rest of the lab, we'll referece the model we trained and deployed from the previous labs, so make sure you have run the code in the `4a_streaming_data_training.ipynb` notebook. \n",
    "\n",
    "The `add_traffic_last_5min` function below will query the `traffic_realtime` table to find the most recent traffic information and add that feature to our instance for prediction."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# TODO 2a. Write a function to take most recent entry in `traffic_realtime` table and add it to instance.\n",
    "def add_traffic_last_5min(instance):\n",
    "    bq = bigquery.Client()\n",
    "    query_string = \"\"\"\n",
    "    SELECT\n",
    "      *\n",
    "    FROM\n",
    "      `taxifare.traffic_realtime`\n",
    "    ORDER BY\n",
    "      time DESC\n",
    "    LIMIT 1\n",
    "    \"\"\"\n",
    "    trips = bq.query(query_string).to_dataframe()['trips_last_5min'][0]\n",
    "    instance['traffic_last_5min'] = int(trips)\n",
    "    return instance"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The `traffic_realtime` table is updated in realtime using Cloud Pub/Sub and Dataflow so, if you run the cell below periodically, you should see the `traffic_last_5min` feature added to the instance and change over time. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "add_traffic_last_5min(instance={'dayofweek': 4,\n",
    "                                'hourofday': 13,\n",
    "                                'pickup_longitude': -73.99,\n",
    "                                'pickup_latitude': 40.758,\n",
    "                                'dropoff_latitude': 41.742,\n",
    "                                'dropoff_longitude': -73.07})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Finally, we'll use the python api to call predictions on an instance, using the realtime traffic information in our prediction. Just as above, you should notice that our resulting predicitons change with time as our realtime traffic information changes as well."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Copy the `ENDPOINT_ID` from the deployment in the previous lab to the beginning of the block below."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# TODO 2b. Write code to call prediction on instance using realtime traffic info.\n",
    "#Hint: Look at this sample https://github.com/googleapis/python-aiplatform/blob/master/samples/snippets/predict_custom_trained_model_sample.py\n",
    "\n",
    "ENDPOINT_ID = # TODO: Copy the `ENDPOINT_ID` from the deployment in the previous lab.\n",
    "\n",
    "api_endpoint = f'{REGION}-aiplatform.googleapis.com'\n",
    "\n",
    "# The AI Platform services require regional API endpoints.\n",
    "client_options = {\"api_endpoint\": api_endpoint}\n",
    "# Initialize client that will be used to create and send requests.\n",
    "# This client only needs to be created once, and can be reused for multiple requests.\n",
    "client = aiplatform.gapic.PredictionServiceClient(client_options=client_options)\n",
    "\n",
    "instance = {'dayofweek': 4,\n",
    "            'hourofday': 13,\n",
    "            'pickup_longitude': -73.99,\n",
    "            'pickup_latitude': 40.758,\n",
    "            'dropoff_latitude': 41.742,\n",
    "            'dropoff_longitude': -73.07}\n",
    "\n",
    "# The format of each instance should conform to the deployed model's prediction input schema.\n",
    "instance_dict = add_traffic_last_5min(instance)\n",
    "\n",
    "instance = json_format.ParseDict(instance, Value())\n",
    "instances = [instance]\n",
    "endpoint = client.endpoint_path(\n",
    "    project=PROJECT, location=REGION, endpoint=ENDPOINT_ID\n",
    ")\n",
    "response = client.predict(\n",
    "    endpoint=endpoint, instances=instances\n",
    ")\n",
    "\n",
    "# The predictions are a google.protobuf.Value representation of the model's predictions.\n",
    "print(\" prediction:\", response.predictions[0][0])\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Cleanup"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In order to avoid ongoing charges, when you are finished with this lab, you can delete your Dataflow job of that job from the [Dataflow section of Cloud console](https://console.cloud.google.com/dataflow).\n",
    "\n",
    "An endpoint with a model deployed to it incurs ongoing charges, as there must be at least one replica defined (the `min-replica-count` parameter is at least 1). In order to stop incurring charges, you can click on the endpoint on the [Endpoints page of the Cloud Console](https://console.cloud.google.com/vertex-ai/endpoints) and un-deploy your model."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Copyright 2021 Google Inc. Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License"
   ]
  }
 ],
 "metadata": {
  "environment": {
   "name": "tf2-gpu.2-3.m71",
   "type": "gcloud",
   "uri": "gcr.io/deeplearning-platform-release/tf2-gpu.2-3:m71"
  },
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
